/*
 * Copyright 2019 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.netflix.mantis.samples;

import com.netflix.mantis.samples.proto.AggregationReport;
import com.netflix.mantis.samples.proto.RequestAggregation;
import com.netflix.mantis.samples.proto.RequestEvent;
import com.netflix.mantis.samples.source.RandomRequestSource;
import com.netflix.mantis.samples.stage.AggregationStage;
import com.netflix.mantis.samples.stage.CollectStage;
import com.netflix.mantis.samples.stage.GroupByStage;
import io.mantisrx.runtime.Config;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.core.MantisStream;
import io.mantisrx.runtime.core.WindowSpec;
import io.mantisrx.runtime.core.functions.ReduceFunction;
import io.mantisrx.runtime.core.sinks.ObservableSinkImpl;
import io.mantisrx.runtime.core.sources.ObservableSourceImpl;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import io.mantisrx.runtime.sink.Sinks;
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;


/**
 * This sample demonstrates the use of a multi-stage job in Mantis. Multi-stage jobs are useful when a single
 * container is incapable of processing the entire stream of events.
 * Each stage represents one of these types of
 * computations Scalar->Scalar, Scalar->Group, Group->Scalar, Group->Group.
 *
 * At deploy time the user can configure the number workers for each stage and the resource requirements for each worker.
 * This sample has 3 stages
 * 1. {@link GroupByStage} Receives the raw events, groups them by their category and sends it to the workers of stage 2 in such a way
 * that all events for a particular group will land on the exact same worker of stage 2.
 * 2. {@link AggregationStage} Receives events tagged by their group from the previous stage. Windows over them and
 * sums up the counts of each group it has seen.
 * 3. {@link CollectStage} Recieves the aggregates generated by the previous stage, collects them over a window and
 * generates a consolidated report which is sent to the default Server Sent Event (SSE) sink.
 *
 * Run this sample by executing the main method of this class. Then look for the SSE port where the output of this job
 * will be available for streaming. E.g  Serving modern HTTP SSE server sink on port: 8299
 * via command line do ../gradlew execute
 */

@Slf4j
public class RequestAggregationDslJob extends MantisJobProvider<String> {

    private static final ObjectMapper mapper = new ObjectMapper();

    @Override
    public Job<String> getJobInstance() {
        String groupByParam = "path";
        Config<String> jobConfig = MantisStream.create(null)
            .source(new ObservableSourceImpl<>(new RandomRequestSource()))
            .keyBy(x -> {
                if ("path".equalsIgnoreCase(groupByParam)) {
                    return x.getRequestPath();
                } else {
                    return x.getIpAddress();
                }
            })
            .window(WindowSpec.timed(Duration.ofSeconds(5)))
            .reduce(new ReduceFunction<RequestEvent, RequestAggregation>() {
                @Override
                public RequestAggregation initialValue() {
                    return RequestAggregation.builder().build();
                }

                @Override
                public RequestAggregation reduce(RequestAggregation acc, RequestEvent requestEvent) {
                    // TODO(hmittal): Need access to key-by key
                    return RequestAggregation.builder()
                        .path(requestEvent.getRequestPath())
                        .count(acc.getCount() + requestEvent.getLatency())
                        .build();
                }
            })
            .materialize()
            .keyBy(x -> "")
            .window(WindowSpec.timed(Duration.ofSeconds(5)))
            .reduce(new ReduceFunction<RequestAggregation, AggregationReport>() {
                @Override
                public AggregationReport initialValue() {
                    return new AggregationReport(new ConcurrentHashMap<>());
                }

                @Override
                public AggregationReport reduce(AggregationReport acc, RequestAggregation item) {
                    if (item != null && item.getPath() != null) {
                        acc.getPathToCountMap().put(item.getPath(), item.getCount());
                    }
                    return acc;
                }
            })
            .map(report -> {
                try {
                    return mapper.writeValueAsString(report);
                } catch (JsonProcessingException e) {
                    log.error(e.getMessage());
                    return null;
                }
            })
            .filter(Objects::nonNull)
            // Reuse built in sink that eagerly subscribes and delivers data over SSE
            .sink(new ObservableSinkImpl<>(Sinks.sysout()));

        return jobConfig
            .metadata(new Metadata.Builder()
                .name("GroupByPath")
                .description("Connects to a random data generator source"
                    + " and counts the number of requests for each uri within a window")
                .build())
            .create();
    }

    public static void main(String[] args) {
        // To run locally we use the LocalJobExecutor
        LocalJobExecutorNetworked.execute(new RequestAggregationDslJob().getJobInstance());
    }
}
